-
Notifications
You must be signed in to change notification settings - Fork 168
feat(expr-ir): Support over(*partition_by)
#3224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: oh-nodes
Are you sure you want to change the base?
Conversation
`expected` is now taken from testing the same selector on `main`
Adopting what polars does is simpler than special-casing
…d-over-and-over-again
Aligning this is not important
Adding `parse_into_selector_ir` will require calling this a lot I'd rather skip using `re` when a more performant option is there
Still have some translations missing `by_index` will mean updating `matches_column` to *also* pass in the schema index
Supports selector input for partitions
- Already works, but I wanna add some optimizations for the single partition case - `pc.unique` can be used directly on a lot of `ChunkedArray` types, but `filter` will drop nulls by default, so needs some care if present
Avoids the need for a tempoary composite key column, by using `dictionary_encode` and generating boolean masks based on index position
Left a comment in `selectors` about this issue earlier
for idx in range(len(arr_dict.dictionary)): | ||
# NOTE: Acero filter doesn't support `null_selection_behavior="emit_null"` | ||
# Is there any reasonable way to do this in Acero? | ||
yield native.filter(pc.equal(pa.scalar(idx), indices)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this for use in over(partition_by=...)
?
if so, just as a heads up, we won't be able to accept a solution which involves looping over partitions in python
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh hi Marco, fancy seeing you here 😄
is this for use in
over(partition_by=...)
?
No this part is just for DataFrame.partition_by("one_column")
.
The multi-column variant of that is run in the c++ engine.
For over(partition_by=...)
I need the partitions put back together and only for a single column vs a full table here.
These are different enough that I'd more likely use union
(41d8cc2) and keeping things threaded will probably make more sense.
I'm also thinking ahead for over(*partition_by, order_by=...)
which will be inserting one of these nodes into the plan
Edit: I've just updated the description to try and be clearer that these are related problems - but only in a conceptual sense
But to clarify
we won't be able to accept a solution which involves looping over partitions in python
This isn't looping over partitions, it is looping over an index into the partition key.
The index is being used to create a mask from indices - which AFAICT isn't expensive.
At the very least, I'm expecting this to be cheaper than what ArrowGroupBy.__iter__
currently does. Which involves casting + concatenating columns, before filtering on the resulting keys.
But the note ...
Is there any reasonable way to do this in Acero?
... is me leaving myself a TODO to try and benchmark that later 😅
Related issues
Expr
IR #2572order_by
,hashjoin
,DataFrame.{filter,join}
,Expr.is_{first,last}_distinct
#3173Notes
pc.unique
preserving orderTasks
by_dtype
naive hashby_dtype
handles bare parametric typesDataFrame.partition_by
pyarrow
) to support more kinds ofover(*partition_by)
are a superset of what's needed for itover
is reduced to adapting vector functions to operate on those partitionsDataFrame.partition_by
Group_by.__iter__
ReuseAdapt concept forover(*partition_by)
(non-aggregating)union
or [pyarrow.concat_tables
ReuseAdapt concept for forGroupBy.agg(<BinaryExpr>)